package flinkstudy.stream;

import flinkstudy.bo.EstateInfo;
import flinkstudy.stream.environment.FlinkStreamExecutionEnvironment;
import flinkstudy.stream.source.mock.DynamicUnlimitedData;
import lombok.Data;
import lombok.SneakyThrows;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.state.MapState;
import org.apache.flink.api.common.state.MapStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.datastream.WindowedStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.streaming.api.functions.sink.SinkFunction;
import org.apache.flink.streaming.api.watermark.Watermark;
import org.apache.flink.streaming.api.windowing.assigners.SlidingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import org.junit.Before;
import org.junit.Test;

import javax.annotation.Nullable;
import java.text.SimpleDateFormat;
import java.util.ArrayList;
import java.util.List;

/**
 * 窗口
 *
 * <pre>
 *
 * Keyed Windows
 *
 * stream
 *        .keyBy(...)               <-  keyed versus non-keyed windows
 *        .window(...)              <-  required: "assigner"
 *       [.trigger(...)]            <-  optional: "trigger" (else default trigger)
 *       [.evictor(...)]            <-  optional: "evictor" (else no evictor)
 *       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
 *       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
 *        .reduce/aggregate/fold/apply()      <-  required: "function"
 *       [.getSideOutput(...)]      <-  optional: "output tag"
 * Non-Keyed Windows
 *
 * stream
 *        .windowAll(...)           <-  required: "assigner"
 *       [.trigger(...)]            <-  optional: "trigger" (else default trigger)
 *       [.evictor(...)]            <-  optional: "evictor" (else no evictor)
 *       [.allowedLateness(...)]    <-  optional: "lateness" (else zero)
 *       [.sideOutputLateData(...)] <-  optional: "output tag" (else no side output for late data)
 *        .reduce/aggregate/fold/apply()      <-  required: "function"
 *       [.getSideOutput(...)]      <-  optional: "output tag"
 * </pre>
 *
 * @author daocr
 * @date 2019/12/19
 */
public class Windows {

    private FlinkStreamExecutionEnvironment flinkStreamExecutionEnvironment = null;


    @Before
    public void init() {
        flinkStreamExecutionEnvironment = new FlinkStreamExecutionEnvironment();
    }

    /**
     * 翻滚窗口：将数据根据固定窗口长度对数据进行切片。
     * 特点是：时间对齐，窗口长度固定，没有重叠。
     * 使用场景：适合做BI统计（做每个时间段的聚合统计）
     */
    @Test
    public void tumblingWindows() throws Exception {
        StreamExecutionEnvironment environment = flinkStreamExecutionEnvironment.streamExecutionEnvironment;
        environment.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
        environment.setParallelism(1);
        //每10秒对 过去60秒的数据，进行计算
        SlidingProcessingTimeWindows of = SlidingProcessingTimeWindows.of(Time.days(1), Time.seconds(10));

        WindowedStream<EstateInfo, Tuple, TimeWindow> windowWindowedStream = environment
                .addSource(new DynamicUnlimitedData())
                // 添加水印
                .assignTimestampsAndWatermarks(new EstateInfoAssignerWithPeriodicWatermarks())
                .keyBy("cityId")
                .window(of)
                // 触发器 每5秒触发一次
                .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(5)));

        // 聚合处理
        SingleOutputStreamOperator<CityAccumulator> aggregate = windowWindowedStream
                .aggregate(new EstateInfoCityAccumulatorCityAccumulatorAggregateFunction());

        // 增量 逻辑处理
        SingleOutputStreamOperator<CityAccumulator> cityAccumulatorSingleOutputStreamOperator = aggregate
                .keyBy("cityId")
                .process(new TupleCityAccumulatorCityAccumulatorKeyedProcessFunction());

        // 保存 数据
        cityAccumulatorSingleOutputStreamOperator.addSink(new SinkFunction<CityAccumulator>() {
            @Override
            public void invoke(CityAccumulator value, Context context) throws Exception {
                System.out.println("cityId : " + value.getCityId() + "    " + value.getList().size());
            }
        });

        environment.execute();

    }


    @Data
    public static class CityAccumulator {
        private Integer cityId;
        private List<EstateInfo> list = new ArrayList<>();
    }


    /**
     * 聚合 累加器
     */
    private static class EstateInfoCityAccumulatorCityAccumulatorAggregateFunction implements AggregateFunction<EstateInfo, CityAccumulator, CityAccumulator> {
        @Override
        public CityAccumulator createAccumulator() {
            return new CityAccumulator();
        }

        @Override
        public CityAccumulator add(EstateInfo value, CityAccumulator accumulator) {

            if (accumulator.getCityId() == null) {
                accumulator.setCityId(value.getCityId());
            }

            accumulator.getList().add(value);

            return accumulator;
        }

        @Override
        public CityAccumulator getResult(CityAccumulator accumulator) {
            return accumulator;
        }

        @Override
        public CityAccumulator merge(CityAccumulator a, CityAccumulator b) {

            a.getList().addAll(b.getList());

            return a;
        }
    }

    /**
     * 增量 输出
     */
    private static class TupleCityAccumulatorCityAccumulatorKeyedProcessFunction extends KeyedProcessFunction<Tuple, CityAccumulator, CityAccumulator> {

        private MapState<Integer, CityAccumulator> state;

        @Override
        public void processElement(CityAccumulator value, Context ctx, Collector<CityAccumulator> out) throws Exception {
            Integer cityId = value.getCityId();
            CityAccumulator cache = state.get(cityId);

            if (cache == null || cache.getList().size() != value.getList().size()) {
                out.collect(value);
                state.put(cityId, value);
            }
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            super.open(parameters);

            state = this.getRuntimeContext().getMapState(new MapStateDescriptor<>(
                    "city_cut",
                    Integer.class,
                    CityAccumulator.class)
            );
        }

        @Override
        public void close() throws Exception {
            super.close();
            state.clear();
        }
    }

    private static class EstateInfoAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<EstateInfo> {

        private Long time = null;

        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
            return new Watermark(time);
        }

        @SneakyThrows
        @Override
        public long extractTimestamp(EstateInfo element, long previousElementTimestamp) {
            SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
            time = simpleDateFormat.parse(element.getCreateTimeFormat()).getTime();
            return time;
        }
    }
}
